fetcher: Split lowlevel API into file/membuf variants
authorColin Walters <walters@verbum.org>
Wed, 28 Dec 2016 19:43:28 +0000 (14:43 -0500)
committerAtomic Bot <atomic-devel@projectatomic.io>
Wed, 4 Jan 2017 16:32:11 +0000 (16:32 +0000)
The previous commit introduced a single low level API - however,
we can do things in a more optimal way for the curl backend if
we drop the "streaming API" variant.  Currently, we only use
it to synchronously splice into a memory buffer...which is pretty
silly when we could just do that in the backend.

The only tweak here is that we have an "add NUL character" flag that is
(possibly) needed when fetching into a membuf.

The code here ends up being better I think, since we avoid the double return
value for the `_finish()` invocation, and now most of the fetcher code (in the
soup case) writes to a `GOutputStream` consistently.

This will again make things easier for a curl backend.

Closes: #636
Approved by: jlebon

src/libostree/ostree-fetcher-util.c
src/libostree/ostree-fetcher.c
src/libostree/ostree-fetcher.h
src/libostree/ostree-repo-pull.c

index bc97674d9e37f69ecf1e679f911c0232bd8d2649..b8af972ad11398ac8f8700883be901b6fa5f1865 100644 (file)
@@ -28,7 +28,7 @@
 
 typedef struct
 {
-  GInputStream   *result_stream;
+  GBytes          *result_buf;
   gboolean         done;
   GError         **error;
 }
@@ -41,9 +41,9 @@ fetch_uri_sync_on_complete (GObject        *object,
 {
   FetchUriSyncData *data = user_data;
 
-  (void)_ostree_fetcher_request_finish ((OstreeFetcher*)object,
-                                        result, NULL, &data->result_stream,
-                                        data->error);
+  (void)_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)object,
+                                                  result, &data->result_buf,
+                                                  data->error);
   data->done = TRUE;
 }
 
@@ -59,13 +59,11 @@ _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher  *fetcher,
                                             GError         **error)
 {
   gboolean ret = FALSE;
-  const guint8 nulchar = 0;
-  g_autoptr(GMemoryOutputStream) buf = NULL;
   g_autoptr(GMainContext) mainctx = NULL;
   FetchUriSyncData data;
   g_assert (error != NULL);
 
-  data.result_stream = NULL;
+  memset (&data, 0, sizeof (data));
 
   if (g_cancellable_set_error_if_cancelled (cancellable, error))
     return FALSE;
@@ -76,13 +74,14 @@ _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher  *fetcher,
   data.done = FALSE;
   data.error = error;
 
-  _ostree_fetcher_request_async (fetcher, mirrorlist, filename, 0, max_size,
-                                 OSTREE_FETCHER_DEFAULT_PRIORITY, cancellable,
-                                 fetch_uri_sync_on_complete, &data);
+  _ostree_fetcher_request_to_membuf (fetcher, mirrorlist, filename,
+                                     add_nul ? OSTREE_FETCHER_REQUEST_NUL_TERMINATION : 0,
+                                     max_size, OSTREE_FETCHER_DEFAULT_PRIORITY,
+                                     cancellable, fetch_uri_sync_on_complete, &data);
   while (!data.done)
     g_main_context_iteration (mainctx, TRUE);
 
-  if (!data.result_stream)
+  if (!data.result_buf)
     {
       if (allow_noent)
         {
@@ -96,27 +95,12 @@ _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher  *fetcher,
       goto out;
     }
 
-  buf = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-  if (g_output_stream_splice ((GOutputStream*)buf, data.result_stream,
-                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
-                              cancellable, error) < 0)
-    goto out;
-
-  if (add_nul)
-    {
-      if (!g_output_stream_write ((GOutputStream*)buf, &nulchar, 1, cancellable, error))
-        goto out;
-    }
-
-  if (!g_output_stream_close ((GOutputStream*)buf, cancellable, error))
-    goto out;
-
   ret = TRUE;
-  *out_contents = g_memory_output_stream_steal_as_bytes (buf);
+  *out_contents = g_steal_pointer (&data.result_buf);
  out:
   if (mainctx)
     g_main_context_pop_thread_default (mainctx);
-  g_clear_object (&(data.result_stream));
+  g_clear_pointer (&data.result_buf, (GDestroyNotify)g_bytes_unref);
   return ret;
 }
 
index 7917f729de4fe59e2a44c55e17d570ad960cb6d2..cee33186cbe8a6eb30e8ad0c48faf38fa2157fa6 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "config.h"
 
+#include <gio/gio.h>
 #include <gio/gfiledescriptorbased.h>
 #include <gio/gunixoutputstream.h>
 #define LIBSOUP_USE_UNSTABLE_REQUEST_API
@@ -90,7 +91,8 @@ typedef struct {
 
   SoupRequest *request;
 
-  gboolean is_stream;
+  gboolean is_membuf;
+  OstreeFetcherRequestFlags flags;
   GInputStream *request_body;
   char *out_tmpfile;
   GOutputStream *out_stream;
@@ -468,7 +470,7 @@ session_thread_request_uri (ThreadClosure *thread_closure,
         soup_message_headers_append (msg->request_headers, key, value);
     }
 
-  if (pending->is_stream)
+  if (pending->is_membuf)
     {
       soup_request_send_async (pending->request,
                                cancellable,
@@ -855,6 +857,16 @@ finish_stream (OstreeFetcherPendingURI *pending,
    */
   if (pending->out_stream)
     {
+      if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
+        {
+          const guint8 nulchar = 0;
+          gsize bytes_written;
+
+          if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
+                                          cancellable, error))
+            goto out;
+        }
+
       if (!g_output_stream_close (pending->out_stream, cancellable, error))
         goto out;
 
@@ -864,30 +876,37 @@ finish_stream (OstreeFetcherPendingURI *pending,
       g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
     }
 
-  pending->state = OSTREE_FETCHER_STATE_COMPLETE;
-  if (fstatat (pending->thread_closure->tmpdir_dfd,
-               pending->out_tmpfile,
-               &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
+  if (!pending->is_membuf)
     {
-      glnx_set_error_from_errno (error);
-      goto out;
+      if (fstatat (pending->thread_closure->tmpdir_dfd,
+                   pending->out_tmpfile,
+                   &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
+        {
+          glnx_set_error_from_errno (error);
+          goto out;
+        }
     }
 
+  pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+
   /* Now that we've finished downloading, continue with other queued
    * requests.
    */
   session_thread_process_pending_queue (pending->thread_closure);
 
-  if (stbuf.st_size < pending->content_length)
+  if (!pending->is_membuf)
     {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
-      goto out;
-    }
-  else
-    {
-      g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
-      pending->thread_closure->total_downloaded += stbuf.st_size;
-      g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+      if (stbuf.st_size < pending->content_length)
+        {
+          g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
+          goto out;
+        }
+      else
+        {
+          g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+          pending->thread_closure->total_downloaded += stbuf.st_size;
+          g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+        }
     }
 
   ret = TRUE;
@@ -973,9 +992,18 @@ on_stream_read (GObject        *object,
     {
       if (!finish_stream (pending, cancellable, &local_error))
         goto out;
-      g_task_return_pointer (task,
-                             g_strdup (pending->out_tmpfile),
-                             (GDestroyNotify) g_free);
+      if (pending->is_membuf)
+        {
+          g_task_return_pointer (task,
+                                 g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
+                                 (GDestroyNotify) g_bytes_unref);
+        }
+      else
+        {
+          g_task_return_pointer (task,
+                                 g_strdup (pending->out_tmpfile),
+                                 (GDestroyNotify) g_free);
+        }
       remove_pending_rerun_queue (pending);
     }
   else
@@ -1045,23 +1073,15 @@ on_request_sent (GObject        *object,
   if (SOUP_IS_REQUEST_HTTP (object))
     {
       msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
-      if (msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
+      if (!pending->is_membuf &&
+          msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
         {
           // We already have the whole file, so just use it.
           pending->state = OSTREE_FETCHER_STATE_COMPLETE;
           (void) g_input_stream_close (pending->request_body, NULL, NULL);
-          if (pending->is_stream)
-            {
-              g_task_return_pointer (task,
-                                     g_object_ref (pending->request_body),
-                                     (GDestroyNotify) g_object_unref);
-            }
-          else
-            {
-              g_task_return_pointer (task,
-                                     g_strdup (pending->out_tmpfile),
-                                     (GDestroyNotify) g_free);
-            }
+          g_task_return_pointer (task,
+                                 g_strdup (pending->out_tmpfile),
+                                 (GDestroyNotify) g_free);
           remove_pending_rerun_queue (pending);
           goto out;
         }
@@ -1126,7 +1146,7 @@ on_request_sent (GObject        *object,
   
   pending->content_length = soup_request_get_content_length (pending->request);
 
-  if (!pending->is_stream)
+  if (!pending->is_membuf)
     {
       int oflags = O_CREAT | O_WRONLY | O_CLOEXEC;
       int fd;
@@ -1147,26 +1167,23 @@ on_request_sent (GObject        *object,
           goto out;
         }
       pending->out_stream = g_unix_output_stream_new (fd, TRUE);
-
-      g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
-      g_hash_table_add (pending->thread_closure->output_stream_set,
-                        g_object_ref (pending->out_stream));
-      g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
-
-      g_input_stream_read_bytes_async (pending->request_body,
-                                       8192, G_PRIORITY_DEFAULT,
-                                       cancellable,
-                                       on_stream_read,
-                                       g_object_ref (task));
     }
   else
     {
-      g_task_return_pointer (task,
-                             g_object_ref (pending->request_body),
-                             (GDestroyNotify) g_object_unref);
-      remove_pending_rerun_queue (pending);
+      pending->out_stream = g_memory_output_stream_new_resizable ();
     }
-  
+
+  g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+  g_hash_table_add (pending->thread_closure->output_stream_set,
+                    g_object_ref (pending->out_stream));
+  g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+
+  g_input_stream_read_bytes_async (pending->request_body,
+                                   8192, G_PRIORITY_DEFAULT,
+                                   cancellable,
+                                   on_stream_read,
+                                   g_object_ref (task));
+
  out:
   if (local_error)
     {
@@ -1179,11 +1196,12 @@ on_request_sent (GObject        *object,
   g_object_unref (task);
 }
 
-void
+static void
 _ostree_fetcher_request_async (OstreeFetcher         *self,
                                GPtrArray             *mirrorlist,
                                const char            *filename,
                                OstreeFetcherRequestFlags flags,
+                               gboolean               is_membuf,
                                guint64                max_size,
                                int                    priority,
                                GCancellable          *cancellable,
@@ -1203,8 +1221,9 @@ _ostree_fetcher_request_async (OstreeFetcher         *self,
   pending->thread_closure = thread_closure_ref (self->thread_closure);
   pending->mirrorlist = g_ptr_array_ref (mirrorlist);
   pending->filename = g_strdup (filename);
+  pending->flags = flags;
   pending->max_size = max_size;
-  pending->is_stream = (flags & OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL) == 0;
+  pending->is_membuf = is_membuf;
 
   task = g_task_new (self, cancellable, callback, user_data);
   g_task_set_source_tag (task, _ostree_fetcher_request_async);
@@ -1219,12 +1238,26 @@ _ostree_fetcher_request_async (OstreeFetcher         *self,
                            (GDestroyNotify) g_object_unref);
 }
 
+void
+_ostree_fetcher_request_to_tmpfile (OstreeFetcher         *self,
+                                    GPtrArray             *mirrorlist,
+                                    const char            *filename,
+                                    guint64                max_size,
+                                    int                    priority,
+                                    GCancellable          *cancellable,
+                                    GAsyncReadyCallback    callback,
+                                    gpointer               user_data)
+{
+  _ostree_fetcher_request_async (self, mirrorlist, filename, 0, FALSE,
+                                 max_size, priority, cancellable,
+                                 callback, user_data);
+}
+
 gboolean
-_ostree_fetcher_request_finish (OstreeFetcher         *self,
-                                GAsyncResult          *result,
-                                char                 **out_filename,
-                                GInputStream         **out_stream,
-                                GError               **error)
+_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
+                                           GAsyncResult  *result,
+                                           char         **out_filename,
+                                           GError       **error)
 {
   GTask *task;
   OstreeFetcherPendingURI *pending;
@@ -1233,12 +1266,6 @@ _ostree_fetcher_request_finish (OstreeFetcher         *self,
   g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
   g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
 
-  /* Special dance to implement
-    enum FetchResult {
-       Filename(String path),
-       Membuf(uint8[])
-    } in Rust terms
-  */
   task = (GTask*)result;
   pending = g_task_get_task_data (task);
 
@@ -1246,20 +1273,57 @@ _ostree_fetcher_request_finish (OstreeFetcher         *self,
   if (!ret)
     return FALSE;
 
-  if (pending->is_stream)
-    {
-      g_assert (out_stream);
-      *out_stream = ret;
-    }
-  else
-    {
-      g_assert (out_filename);
-      *out_filename = ret;
-    }
+  g_assert (!pending->is_membuf);
+  g_assert (out_filename);
+  *out_filename = ret;
+
+  return TRUE;
+}
+
+void
+_ostree_fetcher_request_to_membuf (OstreeFetcher         *self,
+                                   GPtrArray             *mirrorlist,
+                                   const char            *filename,
+                                   OstreeFetcherRequestFlags flags,
+                                   guint64                max_size,
+                                   int                    priority,
+                                   GCancellable          *cancellable,
+                                   GAsyncReadyCallback    callback,
+                                   gpointer               user_data)
+{
+  _ostree_fetcher_request_async (self, mirrorlist, filename, flags, TRUE,
+                                 max_size, priority, cancellable,
+                                 callback, user_data);
+}
+
+gboolean
+_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
+                                          GAsyncResult  *result,
+                                          GBytes       **out_buf,
+                                          GError       **error)
+{
+  GTask *task;
+  OstreeFetcherPendingURI *pending;
+  gpointer ret;
+
+  g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
+  g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
+
+  task = (GTask*)result;
+  pending = g_task_get_task_data (task);
+
+  ret = g_task_propagate_pointer (task, error);
+  if (!ret)
+    return FALSE;
+
+  g_assert (pending->is_membuf);
+  g_assert (out_buf);
+  *out_buf = ret;
 
   return TRUE;
 }
 
+
 guint64
 _ostree_fetcher_bytes_transferred (OstreeFetcher       *self)
 {
index 10ec51dc28b3c0af8749530bc61948c361824146..a57907e429fffa67436efafb8fbf7ee00adb3ed5 100644 (file)
@@ -103,25 +103,39 @@ void _ostree_fetcher_set_extra_headers (OstreeFetcher *self,
 
 guint64 _ostree_fetcher_bytes_transferred (OstreeFetcher       *self);
 
+void _ostree_fetcher_request_to_tmpfile (OstreeFetcher         *self,
+                                         GPtrArray             *mirrorlist,
+                                         const char            *filename,
+                                         guint64                max_size,
+                                         int                    priority,
+                                         GCancellable          *cancellable,
+                                         GAsyncReadyCallback    callback,
+                                         gpointer               user_data);
+
+gboolean _ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
+                                                    GAsyncResult  *result,
+                                                    char         **out_filename,
+                                                    GError       **error);
+
 typedef enum {
-  OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL = (1 << 0)
+  OSTREE_FETCHER_REQUEST_NUL_TERMINATION = (1 << 0)
 } OstreeFetcherRequestFlags;
 
-void _ostree_fetcher_request_async (OstreeFetcher         *self,
-                                    GPtrArray             *mirrorlist,
-                                    const char            *filename,
-                                    OstreeFetcherRequestFlags flags,
-                                    guint64                max_size,
-                                    int                    priority,
-                                    GCancellable          *cancellable,
-                                    GAsyncReadyCallback    callback,
-                                    gpointer               user_data);
-
-gboolean _ostree_fetcher_request_finish (OstreeFetcher *self,
-                                         GAsyncResult  *result,
-                                         char         **out_filename,
-                                         GInputStream **out_stream,
-                                         GError       **error);
+void _ostree_fetcher_request_to_membuf (OstreeFetcher         *self,
+                                        GPtrArray             *mirrorlist,
+                                        const char            *filename,
+                                        OstreeFetcherRequestFlags flags,
+                                        guint64                max_size,
+                                        int                    priority,
+                                        GCancellable          *cancellable,
+                                        GAsyncReadyCallback    callback,
+                                        gpointer               user_data);
+
+gboolean _ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
+                                                   GAsyncResult  *result,
+                                                   GBytes       **out_buf,
+                                                   GError       **error);
+
 
 G_END_DECLS
 
index 100d93048609d227441c8eb4277d33816cab193d..92efd59255e213b13d872e90cf6f106b0db8f830 100644 (file)
@@ -708,7 +708,7 @@ content_fetch_on_complete (GObject        *object,
   OstreeObjectType objtype;
   gboolean free_fetch_data = TRUE;
 
-  if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error))
+  if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
     goto out;
 
   ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
@@ -841,7 +841,7 @@ meta_fetch_on_complete (GObject           *object,
   g_debug ("fetch of %s%s complete", checksum_obj,
            fetch_data->is_detached_meta ? " (detached)" : "");
 
-  if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error))
+  if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
     {
       if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
         {
@@ -981,7 +981,7 @@ static_deltapart_fetch_on_complete (GObject           *object,
 
   g_debug ("fetch static delta part %s complete", fetch_data->expected_checksum);
 
-  if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error))
+  if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
     goto out;
 
   fd = openat (_ostree_fetcher_get_dfd (fetcher), temp_path, O_RDONLY | O_CLOEXEC);
@@ -1380,13 +1380,12 @@ enqueue_one_object_request (OtPullData        *pull_data,
   else
     expected_max_size = 0;
 
-  _ostree_fetcher_request_async (pull_data->fetcher, mirrorlist,
-                                 obj_subpath, OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL,
-                                 expected_max_size,
-                                 is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
-                                 : OSTREE_REPO_PULL_CONTENT_PRIORITY,
-                                 pull_data->cancellable,
-                                 is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
+  _ostree_fetcher_request_to_tmpfile (pull_data->fetcher, mirrorlist,
+                                      obj_subpath, expected_max_size,
+                                      is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
+                                      : OSTREE_REPO_PULL_CONTENT_PRIORITY,
+                                      pull_data->cancellable,
+                                      is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
 }
 
 static gboolean
@@ -1734,14 +1733,13 @@ process_one_static_delta (OtPullData   *pull_data,
         }
       else
         {
-          _ostree_fetcher_request_async (pull_data->fetcher,
-                                         pull_data->content_mirrorlist,
-                                         deltapart_path, OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL,
-                                         size,
-                                         OSTREE_FETCHER_DEFAULT_PRIORITY,
-                                         pull_data->cancellable,
-                                         static_deltapart_fetch_on_complete,
-                                         fetch_data);
+          _ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
+                                              pull_data->content_mirrorlist,
+                                              deltapart_path, size,
+                                              OSTREE_FETCHER_DEFAULT_PRIORITY,
+                                              pull_data->cancellable,
+                                              static_deltapart_fetch_on_complete,
+                                              fetch_data);
           pull_data->n_outstanding_deltapart_fetches++;
         }
     }